package cmd

import (
	"fmt"
	sarama "github.com/Shopify/sarama"
	"sync"
)

var wg sync.WaitGroup

func  TestKafkaProducer(){
     config := sarama.NewConfig()
     config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
     config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
     config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回

     // 构造一个消息
     msg := &sarama.ProducerMessage{}
     msg.Topic = "test"
     msg.Key = sarama.StringEncoder("testkey")
     msg.Value = sarama.StringEncoder("this is a test log")
     // 连接kafka
     client, err := sarama.NewSyncProducer([]string{"10.0.72.202:9092"}, config)
     if err != nil {
		 fmt.Println("producer closed, err:", err)
		 return
     }
     defer client.Close()
     // 发送消息
     pid, offset, err := client.SendMessage(msg)
     if err != nil {
		 fmt.Println("send msg failed, err:", err)
		 return
     }
     fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

func  TestKafkaConsumer(){
     consumer, err := sarama.NewConsumer([]string{"10.0.72.202:9092"}, nil)
     if err != nil {
        fmt.Printf("fail to start consumer, err:%v\n", err)
        return
     }
     partitionList, err := consumer.Partitions("test") // 根据topic取到所有的分区
     if err != nil {
     	 fmt.Printf("fail to get list of partition:err%v\n", err)
         return
     }
     fmt.Println(partitionList)
     defer consumer.Close()
     for partition := range partitionList { // 遍历所有的分区
		 // 针对每个分区创建一个对应的分区消费者
		 pc, err := consumer.ConsumePartition("test", int32(partition), sarama.OffsetOldest)
		 if err != nil {
			 fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
			 return
		 }
		 //defer pc.AsyncClose()
		 // 异 步从每个分区消费信息
		 wg.Add(1)
		 go func() {
			 for msg :=  range pc.Messages() {
				 fmt.Printf("Partition:%d Offset:%d Key:%s Value:%s \n", msg.Partition, msg.Offset, msg.Key, msg.Value)
			 }
			 wg.Done()
		 }()
     }
	wg.Wait()
}